
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as f

if __name__ == '__main__':
    # spark=SparkSession.builder.appName("test16_udf_define").master("local[*]").getOrCreate()
    # sc=spark.sparkContext

    spark=SparkSession.builder.appName("test1_dataFrame_create")\
        .master("local[*]").getOrCreate()
    sc=spark.sparkContext
    rdd=sc.parallelize([["hadoop spark flink"],["hadoop flink java"]])
    df=spark.createDataFrame(rdd,["line"])
    df.show()

    def split_line(data:str) -> list:
        return data.split(" ")

    # print(type("hadoop spark flink".split(" ")))
    # todo 方式1 注册SQL函数+DSL对象构建UDF
    udf2=spark.udf.register("udf1",split_line,ArrayType(StringType()))
    df.selectExpr("udf1(line)").show(truncate=False)
    df.select(udf2(df["line"])).show(truncate=False)
    df.createOrReplaceTempView("lines")
    spark.sql("""
    select udf1(line) 
    from lines
    """).show(truncate=False)

    # todo 方式2 DSL对象构建UDF
    udf3 = f.udf(split_line,ArrayType(StringType()))
    df.select(udf3(df["line"])).show(truncate=False)








